-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Keep ChannelBrokerState in the database #2297
Keep ChannelBrokerState in the database #2297
Conversation
bdd3999
to
50b6267
Compare
defp under_capacity?(state) do | ||
count_active_contacts(state) < state.capacity | ||
Queue.count_active_contacts(state.channel_id) < state.capacity | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method may have a negative impact on performance: we're counting how many contacts are active for the channel right from the database, and we count every time we try to activate a contact.
If it ever becomes a problem, we could keep a counter in state
for the number of active contacts, and manipulate it as we activate / increment / decrement / deactivate / reenqueue, instead of counting all the time. We'd only count from the database on init/1
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we definitely should initialise the count from the DB upon start, and then keep track in-memory. Adding an extra query for each respondent we try to contact may be a big overload on the DB - does that query have an easy index, at least?
We can always test to be sure before optimising a bit too early.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there are a couple indexes:
- PRIMARY KEY (channel_id, respondent_id)
- INDEX (last_contact)
I tried to avoid aggregate methods everywhere, preferring the quick Repo.exists?
, but the capacity check is the only request where I needed an actual value.
I didn't implement the optimization because keeping a cache is easy, but keeping it correctly updated is another matter.
query = | ||
from q in Queue, | ||
select: [:channel_id, :respondent_id, :channel_state], | ||
where: q.channel_id == ^state.channel_id and q.last_contact < ^last_contact |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hum, maybe we could ORDER BY last_contact ASC
to process the oldest idle contacts first?
This solves the issue of having long lived transactions in Ask.Runtime.Session and Ask.Runtime.SurveyBroker that call into the ChannelBroker process that then fails to checkout a database connection for its own usages.
Some tests will queue a new contact while there is already a contact for the same channel and respondent in the channel broker queue.
9e2dabf
to
7a73ed0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still have to finish reviewing the channel_broker_state_test.exs
file.
defp under_capacity?(state) do | ||
count_active_contacts(state) < state.capacity | ||
Queue.count_active_contacts(state.channel_id) < state.capacity | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we definitely should initialise the count from the DB upon start, and then keep track in-memory. Adding an extra query for each respondent we try to contact may be a big overload on the DB - does that query have an easy index, at least?
We can always test to be sure before optimising a bit too early.
Repo.all( | ||
from r in "respondents", | ||
select: r.id, | ||
where: r.id in ^State.active_respondent_ids(state) and r.state == "active" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we somehow limit this query? Paginate and run multiple times? I'm thinking about full-scale surveys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already limited to the channel capacity (e.g. 200) not the batch size (e.g. 20000) since the query restricts to respondents that the channel-broker considers active.
Maybe we could reverse the logic to return inactive respondents instead of active ones? We expect there won't be less active than we think there are. Then we could paginate and clean-up the respondents from the channel-broker-queue in multiple passes. 🤔
channel_id: state.channel_id, | ||
respondent_id: respondent.id, | ||
queued_at: SystemTime.time().now, | ||
priority: priority, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we can't make the same query choose the highest priority between the new one and the one that was already in the DB (in case of an UPDATE
), right?
Doesn't really matter, I think, since this shouldn't go through the UPDATE
path - but just thinking out loud.
# FIXME: this may take a while, and during that time the channel broker | ||
# won't process its mailbox, if it ever becomes a problem, we might | ||
# consider: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If this could block the channel broker for a long time, let's make it run in small bursts (LIMIT 50
and enqueue a new call for this function in the process' mailbox?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could spawn a Task to run the checks in parallel and have a new message to handle each response individually (e.g. :remove_inactive_respondent
) 🤔
Co-authored-by: Matías García Isaía <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could improve the naming of some models, and there are some rough edges to polish, but overall the PR is OK 👍
This allows to resume operations as if (almost) nothing happened when Surveda restarts.
The state is now saved in the database on each operation to the ChannelBrokerState, instead of fully kept in memory. If Surveda crashes or needs to be restarted, we don't lose anything, and don't cause surveys to hang because the survey broker activated enough respondents, but we lost the queue because of a restart. That should also reduce the memory usages, at the expense of more interaction with MySQL.
Improves the test suite with tests for the Ask.Runtime.ChannelBrokerState module to ascertain its individual behavior. The behavior got changed in a few places to accommodate the new design yet still be valid if we change the data store (again).
I replaced the default Ecto.Adapters.SQL.Sandbox pool used in the test suite (Ecto default) with a DatabaseCleaner solution. The sandbox only allows a single connection to be checkout for transactional tests, but it was breaking the test suite: there are long transactions in SurveyBroker and Session that call into the ChannelBroker process that got blocked because the process failed to checkout the unique connection (already checkout by another process in a blocking manner). The DatabaseCleaner solution is only marginally slower, and could be faster by using the MyXQL driver directly.
channel_id
andrespondent_id
), but that failed quickly in the test suite that tries to queue the same respondent multiple times (:sweat:). I fixed the test suite by adopting an UPSERT strategy: whenever there is a conflict, the new contact will deactivate and replace the existing contact in the queue.In theory this may be incorrect: replacing a queued contact for another queued contact seems fine (forget about the previous attempt) but deactivating an active contact may be overboard?
In practice this may not be an issue: the runtime first asks the channel-broker whether a message is already queued (
has_queued_message?
) or expired (``) before timing out an attempt (seeAsk.Runtime.SurveyBroker.retry_respondent/1
and `Ask.Runtime.Session.timemout/1`).